package gokafka

import (
	"context"
	"strings"
	"time"

	"github.com/Shopify/sarama"
	log "github.com/sirupsen/logrus"
)

type GroupConsumer struct {
	Name      string
	client    *sarama.Client
	consumers map[string]sarama.ConsumerGroup
	Handlers  map[string]*ComsumerGroupHandler
}

func NewConsumer(name string, kafkaUrl string) (*GroupConsumer, error) {
	conf := sarama.NewConfig()
	conf.Consumer.MaxProcessingTime = 30 * time.Second
	conf.Consumer.Offsets.Initial = sarama.OffsetOldest
	conf.Consumer.Return.Errors = true

	conf.ClientID = name
	conf.Version = sarama.V2_0_0_0

	consumer := GroupConsumer{}
	client, err := sarama.NewClient(strings.Split(kafkaUrl, ","), conf)
	if err != nil {
		log.Error("Create kafka consumer client failed:", err)
		return nil, err
	}

	consumer.client = &client
	consumer.consumers = make(map[string]sarama.ConsumerGroup)
	consumer.Handlers = make(map[string]*ComsumerGroupHandler)
	return &consumer, nil
}

func (c *GroupConsumer) AddGroup(groupId string, handler ComsumerHandler) error {
	groupConsumer, err := sarama.NewConsumerGroupFromClient(groupId, *c.client)
	if err != nil {
		log.Error("Create group consumer from client failed:", err)
		return err
	}

	if groupConsumer == nil {
		log.Error("Create group consumer from client nil")
		return nil
	}
	c.consumers[groupId] = groupConsumer
	gouphandler := ComsumerGroupHandler{
		Handler: handler,
	}
	c.Handlers[groupId] = &gouphandler
	return nil
}
func (c *GroupConsumer) ListGroup() []string {
	var list []string
	for groupId, _ := range c.consumers {
		list = append(list, groupId)
	}
	return list
}

func (c *GroupConsumer) ConsumeTopicsByGroup(ctx context.Context, groupId string, topics []string) {
	_, ok := c.consumers[groupId]
	if !ok {
		log.Error("GroupId note exit,", groupId)
		return
	}

	for {
		err := c.consumers[groupId].Consume(ctx, topics, *c.Handlers[groupId])
		if err == sarama.ErrClosedConsumerGroup {
			return
		} else if err != nil {
			log.Error("consumer error:", err)
			//return
		}
	}
}

func (c *GroupConsumer) ConsumeTopic(ctx context.Context, topic string, handler ComsumerHandler) {
	groupId := topic + "_goup"
	err := c.AddGroup(groupId, handler)
	if err != nil {
		log.Error("ConsumeTopic failed,", err)
		return
	}

	var topics []string
	topics = append(topics, topic)
	c.ConsumeTopicsByGroup(ctx, groupId, topics)
}

func (c *GroupConsumer) Close() {
	for _, consumer := range c.consumers {
		consumer.Close()
	}
}

func (c *GroupConsumer) CloseConsumer(groupId string) {
	c.consumers[groupId].Close()
}
